/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.action.hadoop;

import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SparkConfigurationService;
import org.jdom.Element;
import org.jdom.Namespace;

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SparkActionExecutor extends JavaActionExecutor {
    public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
    public static final String SPARK_MASTER = "oozie.spark.master";
    public static final String SPARK_MODE = "oozie.spark.mode";
    public static final String SPARK_OPTS = "oozie.spark.spark-opts";
    public static final String SPARK_DEFAULT_OPTS = "oozie.spark.spark-default-opts";
    public static final String SPARK_JOB_NAME = "oozie.spark.name";
    public static final String SPARK_CLASS = "oozie.spark.class";
    public static final String SPARK_JAR = "oozie.spark.jar";
    public static final String MAPRED_CHILD_ENV = "mapred.child.env";
    private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
    private static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
    private static final String HADOOP_CLIENT_CONF_DIR = "HADOOP_CLIENT_CONF_DIR";

    public SparkActionExecutor() {
        super("spark");
    }

    @Override
    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
            throws ActionExecutorException {
        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
        Namespace ns = actionXml.getNamespace();

        String master = actionXml.getChildTextTrim("master", ns);
        actionConf.set(SPARK_MASTER, master);

        String mode = actionXml.getChildTextTrim("mode", ns);
        if (mode != null) {
            actionConf.set(SPARK_MODE, mode);
        }

        String jobName = actionXml.getChildTextTrim("name", ns);
        actionConf.set(SPARK_JOB_NAME, jobName);

        String sparkClass = actionXml.getChildTextTrim("class", ns);
        if (sparkClass != null) {
            actionConf.set(SPARK_CLASS, sparkClass);
        }

        String jarLocation = actionXml.getChildTextTrim("jar", ns);
        actionConf.set(SPARK_JAR, jarLocation);

        if (master.startsWith("yarn")) {
            String resourceManager = actionConf.get(HADOOP_YARN_RM);
            Properties sparkConfig =
                    Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
            if (!sparkConfig.isEmpty()) {
                try (final StringWriter sw = new StringWriter()) {
                    sparkConfig.store(sw, "Generated by Oozie server SparkActionExecutor");
                    actionConf.set(SPARK_DEFAULT_OPTS, sw.toString());
                } catch (IOException e) {
                    LOG.warn("Could not propagate Spark default configuration!", e);
                }
            }
        }
        String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
        if (!Strings.isNullOrEmpty(sparkOpts)) {
            actionConf.set(SPARK_OPTS, sparkOpts.toString().trim());
        }

        // Setting if SparkMain should setup hadoop config *-site.xml
        boolean setupHadoopConf = actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR,
                ConfigurationService.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR));
        actionConf.setBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, setupHadoopConf);
        return actionConf;
    }

    @Override
    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
            throws ActionExecutorException {
        super.setupLauncherConf(conf, actionXml, appPath, context);

        // Set SPARK_HOME environment variable on launcher job
        // It is needed since pyspark client checks for it.
        String sparkHome = "SPARK_HOME=.";
        String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV);

        if (mapredChildEnv == null) {
            conf.set(MAPRED_CHILD_ENV, sparkHome);
            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome);
        } else if (!mapredChildEnv.contains("SPARK_HOME")) {
            conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
        }
        return conf;
    }

    @Override
    public List<Class<?>> getLauncherClasses() {
        List<Class<?>> classes = new ArrayList<Class<?>>();
        try {
            classes.add(Class.forName(SPARK_MAIN_CLASS_NAME));
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Class not found", e);
        }
        return classes;
    }


    /**
     * Return the sharelib name for the action.
     *
     * @param actionXml action xml element
     * @return returns <code>spark</code>.
     */
    @Override
    protected String getDefaultShareLibName(Element actionXml) {
        return "spark";
    }

    @Override
    protected void addActionSpecificEnvVars(Map<String, String> env) {
        env.put("SPARK_HOME", ".");
        setHadoopConfDirIfEmpty(env);
    }

    private void setHadoopConfDirIfEmpty(Map<String, String> env) {
        String envHadoopConfDir = env.get(HADOOP_CONF_DIR);
        if (StringUtils.isEmpty(envHadoopConfDir)) {
            String hadoopClientConfDirVariable = String.format("${%s}",HADOOP_CLIENT_CONF_DIR);
            LOG.debug("Setting {0} environment variable to {1}.", HADOOP_CONF_DIR, hadoopClientConfDirVariable);
            env.put(HADOOP_CONF_DIR, hadoopClientConfDirVariable);
        }
        else {
            LOG.debug( "Environment variable {0} is already set to {1}.", HADOOP_CONF_DIR, envHadoopConfDir);
        }
    }

    @Override
    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
        return launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME);
    }

    @Override
    public String[] getShareLibFilesForActionConf() {
        return new String[] { "hive-site.xml" };
    }

}
